package com.shao.demo.canalclient.scheduling;

import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.Message;

import com.shao.demo.canalclient.event.InsertCanalEvent;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


/**
 * @author zhiqi.shao
 * @Date 2018/6/6 09:22
 */

@Slf4j
@Component
public class CanalScheduling implements Runnable, ApplicationContextAware {

    private ApplicationContext applicationContext;

    @Resource
    private CanalConnector canalConnector;

    @Scheduled(fixedDelay = 100)
    @Override
    public void run() {
        try {
            //获取指定数量的数据
            int batchSize = 1000;
            Message message = canalConnector.getWithoutAck(batchSize);
            // 数据批号
            long batchId = message.getId();
            log.debug("scheduled_batchId=" + batchId);
            try {
                List<Entry> entries = message.getEntries();
                if (batchId != -1 && entries.size() > 0) {
                    entries.forEach(entry -> {
                        if (entry.getEntryType() == EntryType.ROWDATA) {
                            //事件发布
                            publishCanalEvent(entry);
                        }
                    });
                }
                canalConnector.ack(batchId);// 提交确认
            } catch (Exception e) {
                log.info("发送监听事件失败！batchId回滚,batchId=" + batchId, e);
                //这次回滚后下次激活会继续收到这个binlog推送
                canalConnector.rollback(batchId);
            }
        } catch (Exception e) {
            log.error("canal_scheduled异常！", e);
        }
    }

    private void publishCanalEvent(Entry entry) {
        EventType eventType = entry.getHeader().getEventType();
        switch (eventType) {
            case INSERT:
                applicationContext.publishEvent(new InsertCanalEvent(entry));
                break;
            case UPDATE:
                //todo UPDATE  方案选择
                handleEntry(entry);
                break;
            case DELETE:
                //todo delete
                handleEntry(entry);
                break;
            default:
                break;
        }
    }

    /**
     * canal的Entry对象，里面有我们需要的数据
     * @param entry
     */
    private void handleEntry(Entry entry) {
        CanalEntry.RowChange rowChage;
        try {
            rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
        }

        EventType eventType = rowChage.getEventType();
        log.info(String.format("========> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                eventType));

        for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                log.info("-------> before");
                printColumn(rowData.getBeforeColumnsList());
                log.info("-------> after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }


    private void printColumn(@NotNull List<CanalEntry.Column> columns) {
        Map<String, Object> columnsToMap = parseColumnsToMap(columns);
        log.info("-------> columns to json:{}", JSON.toJSON(columnsToMap));
    }


    private Map<String, Object> parseColumnsToMap(List<CanalEntry.Column> columns) {
        Map<String, Object> jsonMap = new HashMap<>();
        columns.forEach(column -> {
            if (column == null) {
                return;
            }
            jsonMap.put(column.getName(), column.getValue());
        });
        return jsonMap;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}
